{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Tools for NLP\n",
    "\n",
    "There are lots of feature transformations that need to be done on text data to get it to a point that machine learning algorithms can understand. Luckily, Spark has placed the most important ones in convienent Feature Transformer calls. \n",
    "\n",
    "Let's go over them before jumping into the project."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "spark = SparkSession.builder.appName('nlp').getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Tokenizer\n",
    "<p><a href=\"http://en.wikipedia.org/wiki/Lexical_analysis#Tokenization\">Tokenization</a> is the process of taking text (such as a sentence) and breaking it into individual terms (usually words).  A simple <a href=\"api/scala/index.html#org.apache.spark.ml.feature.Tokenizer\">Tokenizer</a> class provides this functionality.  The example below shows how to split sentences into sequences of words.</p>\n",
    "\n",
    "<p><a href=\"api/scala/index.html#org.apache.spark.ml.feature.RegexTokenizer\">RegexTokenizer</a> allows more\n",
    " advanced tokenization based on regular expression (regex) matching.\n",
    " By default, the parameter &#8220;pattern&#8221; (regex, default: <code>\"\\\\s+\"</code>) is used as delimiters to split the input text.\n",
    " Alternatively, users can set parameter &#8220;gaps&#8221; to false indicating the regex &#8220;pattern&#8221; denotes\n",
    " &#8220;tokens&#8221; rather than splitting gaps, and find all matching occurrences as the tokenization result.</p>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import Tokenizer, RegexTokenizer\n",
    "from pyspark.sql.functions import col, udf\n",
    "from pyspark.sql.types import IntegerType"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sentenceDataFrame = spark.createDataFrame([\n",
    "    (0, \"Hi I heard about Spark\"),\n",
    "    (1, \"I wish Java could use case classes\"),\n",
    "    (2, \"Logistic,regression,models,are,neat\")\n",
    "], [\"id\", \"sentence\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+\n",
      "| id|            sentence|\n",
      "+---+--------------------+\n",
      "|  0|Hi I heard about ...|\n",
      "|  1|I wish Java could...|\n",
      "|  2|Logistic,regressi...|\n",
      "+---+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentenceDataFrame.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------------------------+------------------------------------------+------+\n",
      "|sentence                           |words                                     |tokens|\n",
      "+-----------------------------------+------------------------------------------+------+\n",
      "|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |\n",
      "|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |\n",
      "|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |\n",
      "+-----------------------------------+------------------------------------------+------+\n",
      "\n",
      "+-----------------------------------+------------------------------------------+------+\n",
      "|sentence                           |words                                     |tokens|\n",
      "+-----------------------------------+------------------------------------------+------+\n",
      "|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |\n",
      "|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |\n",
      "|Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |\n",
      "+-----------------------------------+------------------------------------------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "tokenizer = Tokenizer(inputCol=\"sentence\", outputCol=\"words\")\n",
    "\n",
    "regexTokenizer = RegexTokenizer(inputCol=\"sentence\", outputCol=\"words\", pattern=\"\\\\W\")\n",
    "# alternatively, pattern=\"\\\\w+\", gaps(False)\n",
    "\n",
    "countTokens = udf(lambda words: len(words), IntegerType())\n",
    "\n",
    "tokenized = tokenizer.transform(sentenceDataFrame)\n",
    "tokenized.select(\"sentence\", \"words\")\\\n",
    "    .withColumn(\"tokens\", countTokens(col(\"words\"))).show(truncate=False)\n",
    "\n",
    "regexTokenized = regexTokenizer.transform(sentenceDataFrame)\n",
    "regexTokenized.select(\"sentence\", \"words\") \\\n",
    "    .withColumn(\"tokens\", countTokens(col(\"words\"))).show(truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "## Stop Words Removal\n",
    "\n",
    "<p><a href=\"https://en.wikipedia.org/wiki/Stop_words\">Stop words</a> are words which\n",
    "should be excluded from the input, typically because the words appear\n",
    "frequently and don&#8217;t carry as much meaning.</p>\n",
    "\n",
    "<p><code>StopWordsRemover</code> takes as input a sequence of strings (e.g. the output\n",
    "of a <a href=\"ml-features.html#tokenizer\">Tokenizer</a>) and drops all the stop\n",
    "words from the input sequences. The list of stopwords is specified by\n",
    "the <code>stopWords</code> parameter. Default stop words for some languages are accessible \n",
    "by calling <code>StopWordsRemover.loadDefaultStopWords(language)</code>, for which available \n",
    "options are &#8220;danish&#8221;, &#8220;dutch&#8221;, &#8220;english&#8221;, &#8220;finnish&#8221;, &#8220;french&#8221;, &#8220;german&#8221;, &#8220;hungarian&#8221;, \n",
    "&#8220;italian&#8221;, &#8220;norwegian&#8221;, &#8220;portuguese&#8221;, &#8220;russian&#8221;, &#8220;spanish&#8221;, &#8220;swedish&#8221; and &#8220;turkish&#8221;. \n",
    "A boolean parameter <code>caseSensitive</code> indicates if the matches should be case sensitive \n",
    "(false by default).</p>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+----------------------------+--------------------+\n",
      "|id |raw                         |filtered            |\n",
      "+---+----------------------------+--------------------+\n",
      "|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |\n",
      "|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|\n",
      "+---+----------------------------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import StopWordsRemover\n",
    "\n",
    "sentenceData = spark.createDataFrame([\n",
    "    (0, [\"I\", \"saw\", \"the\", \"red\", \"balloon\"]),\n",
    "    (1, [\"Mary\", \"had\", \"a\", \"little\", \"lamb\"])\n",
    "], [\"id\", \"raw\"])\n",
    "\n",
    "remover = StopWordsRemover(inputCol=\"raw\", outputCol=\"filtered\")\n",
    "remover.transform(sentenceData).show(truncate=False)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## n-grams\n",
    "\n",
    "An n-gram is a sequence of nn tokens (typically words) for some integer nn. The NGram class can be used to transform input features into nn-grams.\n",
    "\n",
    "<p><code>NGram</code> takes as input a sequence of strings (e.g. the output of a <a href=\"ml-features.html#tokenizer\">Tokenizer</a>).  The parameter <code>n</code> is used to determine the number of terms in each $n$-gram. The output will consist of a sequence of $n$-grams where each $n$-gram is represented by a space-delimited string of $n$ consecutive words.  If the input sequence contains fewer than <code>n</code> strings, no output is produced.</p>\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------------------------------------------------------+\n",
      "|ngrams                                                            |\n",
      "+------------------------------------------------------------------+\n",
      "|[Hi I, I heard, heard about, about Spark]                         |\n",
      "|[I wish, wish Java, Java could, could use, use case, case classes]|\n",
      "|[Logistic regression, regression models, models are, are neat]    |\n",
      "+------------------------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import NGram\n",
    "\n",
    "wordDataFrame = spark.createDataFrame([\n",
    "    (0, [\"Hi\", \"I\", \"heard\", \"about\", \"Spark\"]),\n",
    "    (1, [\"I\", \"wish\", \"Java\", \"could\", \"use\", \"case\", \"classes\"]),\n",
    "    (2, [\"Logistic\", \"regression\", \"models\", \"are\", \"neat\"])\n",
    "], [\"id\", \"words\"])\n",
    "\n",
    "ngram = NGram(n=2, inputCol=\"words\", outputCol=\"ngrams\")\n",
    "\n",
    "ngramDataFrame = ngram.transform(wordDataFrame)\n",
    "ngramDataFrame.select(\"ngrams\").show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_______\n",
    "# Feature Extractors\n",
    "_______"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<h2 id=\"tf-idf\">TF-IDF</h2>\n",
    "\n",
    "<p><a href=\"http://en.wikipedia.org/wiki/Tf%E2%80%93idf\">Term frequency-inverse document frequency (TF-IDF)</a> \n",
    "is a feature vectorization method widely used in text mining to reflect the importance of a term \n",
    "to a document in the corpus. Denote a term by <code>$t$</code>, a document by  d , and the corpus by D.\n",
    "Term frequency <code>$TF(t, d)$</code> is the number of times that term <code>$t$</code> appears in document <code>$d$</code>, while \n",
    "document frequency <code>$DF(t, D)$</code> is the number of documents that contains term <code>$t$</code>. If we only use \n",
    "term frequency to measure the importance, it is very easy to over-emphasize terms that appear very \n",
    "often but carry little information about the document, e.g. &#8220;a&#8221;, &#8220;the&#8221;, and &#8220;of&#8221;. If a term appears \n",
    "very often across the corpus, it means it doesn&#8217;t carry special information about a particular document.\n",
    "Inverse document frequency is a numerical measure of how much information a term provides:\n",
    "\n",
    "$$ IDF(t, D) = \\log \\frac{|D| + 1}{DF(t, D) + 1} $$\n",
    "\n",
    "where |D| is the total number of documents in the corpus. Since logarithm is used, if a term \n",
    "appears in all documents, its IDF value becomes 0. Note that a smoothing term is applied to avoid \n",
    "dividing by zero for terms outside the corpus. The TF-IDF measure is simply the product of TF and IDF:\n",
    "$$ TFIDF(t, d, D) = TF(t, d) \\cdot IDF(t, D). $$\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------------------+\n",
      "|label|            sentence|\n",
      "+-----+--------------------+\n",
      "|  0.0|Hi I heard about ...|\n",
      "|  0.0|I wish Java could...|\n",
      "|  1.0|Logistic regressi...|\n",
      "+-----+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import HashingTF, IDF, Tokenizer\n",
    "\n",
    "sentenceData = spark.createDataFrame([\n",
    "    (0.0, \"Hi I heard about Spark\"),\n",
    "    (0.0, \"I wish Java could use case classes\"),\n",
    "    (1.0, \"Logistic regression models are neat\")\n",
    "], [\"label\", \"sentence\"])\n",
    "\n",
    "sentenceData.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------------------+--------------------+\n",
      "|label|            sentence|               words|\n",
      "+-----+--------------------+--------------------+\n",
      "|  0.0|Hi I heard about ...|[hi, i, heard, ab...|\n",
      "|  0.0|I wish Java could...|[i, wish, java, c...|\n",
      "|  1.0|Logistic regressi...|[logistic, regres...|\n",
      "+-----+--------------------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "tokenizer = Tokenizer(inputCol=\"sentence\", outputCol=\"words\")\n",
    "wordsData = tokenizer.transform(sentenceData)\n",
    "wordsData.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------------------+\n",
      "|label|            features|\n",
      "+-----+--------------------+\n",
      "|  0.0|(20,[0,5,9,17],[0...|\n",
      "|  0.0|(20,[2,7,9,13,15]...|\n",
      "|  1.0|(20,[4,6,13,15,18...|\n",
      "+-----+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "hashingTF = HashingTF(inputCol=\"words\", outputCol=\"rawFeatures\", numFeatures=20)\n",
    "featurizedData = hashingTF.transform(wordsData)\n",
    "# alternatively, CountVectorizer can also be used to get term frequency vectors\n",
    "\n",
    "idf = IDF(inputCol=\"rawFeatures\", outputCol=\"features\")\n",
    "idfModel = idf.fit(featurizedData)\n",
    "rescaledData = idfModel.transform(featurizedData)\n",
    "\n",
    "rescaledData.select(\"label\", \"features\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## CountVectorizer\n",
    "CountVectorizer and CountVectorizerModel aim to help convert a collection of text documents to vectors of token counts. When an a-priori dictionary is not available, CountVectorizer can be used as an Estimator to extract the vocabulary, and generates a CountVectorizerModel. The model produces sparse representations for the documents over the vocabulary, which can then be passed to other algorithms like LDA.\n",
    "\n",
    "During the fitting process, CountVectorizer will select the top vocabSize words ordered by term frequency across the corpus. An optional parameter minDF also affects the fitting process by specifying the minimum number (or fraction if < 1.0) of documents a term must appear in to be included in the vocabulary. Another optional binary toggle parameter controls the output vector. If set to true all nonzero counts are set to 1. This is especially useful for discrete probabilistic models that model binary, rather than integer, counts."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+---------------+-------------------------+\n",
      "|id |words          |features                 |\n",
      "+---+---------------+-------------------------+\n",
      "|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|\n",
      "|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|\n",
      "+---+---------------+-------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import CountVectorizer\n",
    "\n",
    "# Input data: Each row is a bag of words with a ID.\n",
    "df = spark.createDataFrame([\n",
    "    (0, \"a b c\".split(\" \")),\n",
    "    (1, \"a b b c a\".split(\" \"))\n",
    "], [\"id\", \"words\"])\n",
    "\n",
    "# fit a CountVectorizerModel from the corpus.\n",
    "cv = CountVectorizer(inputCol=\"words\", outputCol=\"features\", vocabSize=3, minDF=2.0)\n",
    "\n",
    "model = cv.fit(df)\n",
    "\n",
    "result = model.transform(df)\n",
    "result.show(truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [conda root]",
   "language": "python",
   "name": "conda-root-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
